import kafka.serializer.StringDecoder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Create by jenrey on 2018/5/27 21:07
  */
object AdvApplicationTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("AdvApplicationTest")
    conf.setMaster("local")
    conf.set("", "") //序列化
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))

    /**
      * TODO:第一步：从kafka获取数据（direct 方式）
      */
    /* K: ClassTag,
       V: ClassTag,
       KD <: Decoder[K]: ClassTag,
       VD <: Decoder[V]: ClassTag] (
       ssc: StreamingContext,
       kafkaParams: Map[String, String],
       topics: Set[String]*/
    val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
    val topics = Set("aura")
    val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

    //TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】，这样的用户属于黑名单用户，这样的数据就不统计了
    /**
      * TODO:第二步：进行黑名单过滤
      */
    val filterLogDStream: DStream[String] = blackListFileter(logDStream, ssc)

    /**
      * TODO:第三步：动态生成黑名单  实时生成黑名单
      */

    /**
      * TODO:第四步：实时统计每天各省各城市广告点击量
      */
    /**
      * TODO:第五步：实时统计每天各省热门广告点击量
      */
    /**
      * TODO:第六步：实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
      */
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  /**
    * 对黑名单进行过滤的方法
    *
    * @param logDStream 从kafka读取数据
    * @return 进行黑名单过滤以后的数据
    */
  def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
    //这个地方的黑名单，应该是从我们持久化的数据库里面读取的：有三个数据库是我们常用的（Redis，hbase，mysql）
    val blackList = List((1L, true), (2L, true), (3L, true))
    //把黑名单转化成RDD
    val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
    //广播黑名单
    val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
    //transform对传进来的DStream中的每一个RDD进行操作
    logDStream.transform(rdd => {
      //把传进来的数据切分，组成kv形式
      val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
        val fields: Array[String] = line.split(",")
        (fields(3).toLong, line)
      })
      //注意广播出去后，需要使用.value来获取播放值
      val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
      /**
        * List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
        * List((1L, true), (2L, true), (3L, true))
        * leftOuterJoin 后的结果如下，此算子必须都是kv形式才行
        * (22,(qwe,None))
        * (3,(zxc,Some(true)))
        * (2,(asd,Some(true)))
        */
      val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
      //这个是返回值，返回进行黑名单过滤以后的数据
      resultRDD.filter(tuple => {
        tuple._2._2.isEmpty
      }).map(_._2._1)
    })
  }
}
